Graylog Security 6.3 內建異常檢測模組,利用 OpenSearch 的機器學習引擎對日誌數據進行行為基準分析與異常偵測,具備以下特性:
系統在啟用後,根據「訓練期間(Training Period)」內的正常流量建立行為基準。此基準含多維度統計(如事件總量、平均頻率、峰值時間),使後續異常偵測更具上下文意義。
系統以固定秒數為間隔執行批次檢測,典型值為 300 秒。間隔越短,偵測越即時,但對資源需求更高。
可針對不同索引與應用場景選擇關鍵欄位,如:
這些欄位將作為特徵變數餵入 ML 模型,偵測異常偏離。
在 Web UI「System → Anomaly Detection」頁面:
以下範例示範如何根據異常分數(anomaly_score)將事件路由至專用 Stream,以便 Alert 或後續處理。先在 Graylog UI 中建立一個名為「Anomaly_Stream」的 Stream。
rule "Route events by anomaly score"
when
has_field("anomaly_score") &&
to_double($message.anomaly_score) > 75.0
then
route_to_stream("Anomaly_Stream");
set_field("anomaly_detected", true);
end
has_field("anomaly_score"):檢查事件是否包含異常分數欄位to_double(...) > 75.0:設定分數門檻為 75route_to_stream("Anomaly_Stream"):將符合條件的事件推送至該 Streamset_field("anomaly_detected", true):標記事件為已偵測異常以下以 Python 3.11+ 說明如何透過 Graylog REST API 建立、查詢與刪除自定義異常檢測器(Detector)。
import requests
GRAYLOG_URL = "http://<graylog-ip>:9000"
API_TOKEN = "YOUR_API_TOKEN"
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
def create_detector(name: str, index_pattern: str) -> dict:
payload = {
"name": name,
"description": "自動化異常檢測器",
"time_field": "@timestamp",
"indices": [index_pattern],
"feature_attributes": [
{"type": "count", "field": "message"},
{"type": "mean", "field": "duration_ms"}
],
"detection_interval": {"period": {"interval": 5, "unit": "MINUTES"}},
"window_delay": {"period": {"interval": 1, "unit": "MINUTES"}}
}
resp = requests.post(
f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors",
headers=headers, json=payload
)
return resp.json()
def list_detectors() -> dict:
resp = requests.get(
f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors",
headers=headers
)
return resp.json()
def delete_detector(detector_id: str) -> None:
requests.delete(
f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors/{detector_id}",
headers=headers
)
if __name__ == "__main__":
# 建立一個新的偵測器
new = create_detector("ExampleDetector", "graylog_*")
print("Created Detector ID:", new.get("id"))
# 列出所有偵測器
all_detectors = list_detectors()
print("Detectors List:", all_detectors)
# 刪除示範用偵測器
example_id = new.get("id")
if example_id:
delete_detector(example_id)
print(f"Deleted Detector ID: {example_id}")
feature_attributes:定義要監控的特徵欄位及統計方式detection_interval、window_delay:控制偵測執行與遲延時間Bearer Token 驗證,確保 API 權限